#pragma once

#include <atomic>
#include <expected>

#include <base/UUID.h>
#include <base/defines.h>
#include <pcg_random.hpp>

#include <Common/EventNotifier.h>
#include <Common/ProfileEventsScope.h>
#include <Common/Throttler.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/randomSeed.h>
#include <Core/BackgroundSchedulePool.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/PartLog.h>
#include <Parsers/SyncReplicaMode.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/IStorage.h>
#include <Storages/IStorageCluster.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
#include <Storages/MergeTree/MergeFromLogEntryTask.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/PartMovesBetweenShardsOrchestrator.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAttachThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/ReplicatedTableStatus.h>
#include <Storages/RenamingRestrictions.h>
#include <Storages/TableZnodeInfo.h>


namespace DB
{

/** The engine that uses the merge tree (see MergeTreeData) and is replicated through ZooKeeper.
  *
  * ZooKeeper is used for the following things:
  * - the structure of the table (/metadata, /columns)
  * - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...);
  * - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host);
  * - the leader replica election (/leader_election) - these are the replicas that assign merges, mutations
  *   and partition manipulations.
  *   (after ClickHouse version 20.5 we allow multiple leaders to act concurrently);
  * - a set of parts of data on each replica (/replicas/replica_name/parts);
  * - list of the last N blocks of data with checksum, for deduplication (/blocks);
  * - the list of incremental block numbers (/block_numbers) that we are about to insert,
  *   to ensure the linear order of data insertion and data merge only on the intervals in this sequence;
  * - coordinate writes with quorum (/quorum).
  * - Storage of mutation entries (ALTER DELETE, ALTER UPDATE etc.) to execute (/mutations).
  *   See comments in StorageReplicatedMergeTree::mutate() for details.
  */

/** The replicated tables have a common log (/log/log-...).
  * Log - a sequence of entries (LogEntry) about what to do.
  * Each entry is one of:
  * - normal data insertion (GET),
  * - data insertion with a possible attach from local data (ATTACH),
  * - merge (MERGE),
  * - delete the partition (DROP).
  *
  * Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...)
  *  and then executes them (queueTask).
  * Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry).
  * In addition, the records in the queue can be generated independently (not from the log), in the following cases:
  * - when creating a new replica, actions are put on GET from other replicas (createReplica);
  * - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check
  *   (at start - checkParts, while running - searchForMissingPart), actions are put on GET from other replicas;
  *
  * The replica to which INSERT was made in the queue will also have an entry of the GET of this data.
  * Such an entry is considered to be executed as soon as the queue handler sees it.
  *
  * The log entry has a creation time. This time is generated by the clock of server that created entry
  * - the one on which the corresponding INSERT or ALTER query came.
  *
  * For the entries in the queue that the replica made for itself,
  * as the time will take the time of creation the appropriate part on any of the replicas.
  */

class ZooKeeperWithFaultInjection;
using ZooKeeperWithFaultInjectionPtr = std::shared_ptr<ZooKeeperWithFaultInjection>;

class StorageReplicatedMergeTree final : public MergeTreeData
{
public:
    /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
      */
    StorageReplicatedMergeTree(
        const TableZnodeInfo & zookeeper_info_,
        LoadingStrictnessLevel mode,
        const StorageID & table_id_,
        const String & relative_data_path_,
        const StorageInMemoryMetadata & metadata_,
        ContextMutablePtr context_,
        const String & date_column_name,
        const MergingParams & merging_params_,
        std::unique_ptr<MergeTreeSettings> settings_,
        bool need_check_structure,
        const ZooKeeperRetriesInfo & create_query_zookeeper_retries_info_);

    void startup() override;

    /// To many shutdown methods....
    ///
    /// Partial shutdown called if we loose connection to zookeeper.
    /// Table can also recover after partial shutdown and continue
    /// to work. This method can be called regularly.
    void partialShutdown();

    /// These two methods are called during final table shutdown (DROP/DETACH/overall server shutdown).
    /// The shutdown process is split into two methods to make it more soft and fast. In database shutdown()
    /// looks like:
    /// for (table : tables)
    ///     table->flushAndPrepareForShutdown()
    ///
    /// for (table : tables)
    ///     table->shutdown()
    ///
    /// So we stop producing all the parts first for all tables (fast operation). And after we can wait in shutdown()
    /// for other replicas to download parts.
    ///
    /// In flushAndPrepareForShutdown we cancel all part-producing operations:
    /// merges, fetches, moves and so on. If it wasn't called before shutdown() -- shutdown() will
    /// call it (defensive programming).
    void flushAndPrepareForShutdown() override;
    /// In shutdown we completely terminate table -- remove
    /// is_active node and interserver handler. Also optionally
    /// wait until other replicas will download some parts from our replica.
    void shutdown(bool is_drop) override;

    ~StorageReplicatedMergeTree() override;

    std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree"; }

    bool supportsParallelInsert() const override { return true; }
    bool supportsReplication() const override { return true; }
    bool supportsDeduplication() const override { return true; }

    void read(
        QueryPlan & query_plan,
        const Names & column_names,
        const StorageSnapshotPtr & storage_snapshot,
        SelectQueryInfo & query_info,
        ContextPtr local_context,
        QueryProcessingStage::Enum processed_stage,
        size_t max_block_size,
        size_t num_streams) override;

    std::optional<UInt64> totalRows(ContextPtr query_context) const override;
    std::optional<UInt64> totalRowsByPartitionPredicate(const ActionsDAG & filter_actions_dag, ContextPtr context) const override;
    std::optional<UInt64> totalBytes(ContextPtr query_context) const override;
    std::optional<UInt64> totalBytesUncompressed(const Settings & settings) const override;
    MutationCounters getMutationCounters() const override;

    SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override;

    bool optimize(
        const ASTPtr & query,
        const StorageMetadataPtr & metadata_snapshot,
        const ASTPtr & partition,
        bool final,
        bool deduplicate,
        const Names & deduplicate_by_columns,
        bool cleanup,
        ContextPtr query_context) override;

    void alter(const AlterCommands & commands, ContextPtr query_context, AlterLockHolder & table_lock_holder) override;

    void mutate(const MutationCommands & commands, ContextPtr context) override;
    void waitMutation(const String & znode_name, size_t mutations_sync) const;
    std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
    CancellationCode killMutation(const String & mutation_id) override;

    QueryPipeline updateLightweight(const MutationCommands & commands, ContextPtr query_context) override;
    bool haveCommittingOps(const CommittingBlocks & committing_blocks, PartitionIdToMaxBlockPtr partitions, std::set<CommittingBlock::Op> ops) const;
    void waitForCommittingOpsToFinish(zkutil::ZooKeeperPtr zookeeper, PartitionIdToMaxBlockPtr partitions, std::set<CommittingBlock::Op> ops, size_t backoff_ms, size_t sync_timeout_ms);

    bool hasLightweightDeletedMask() const override;

    /** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
      */
    void drop() override;

    void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr query_context, TableExclusiveLockHolder &) override;

    void checkTableCanBeRenamed(const StorageID & new_name) const override;

    void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;

    void checkTableCanBeDropped([[ maybe_unused ]] ContextPtr query_context) const override;

    ActionLock getActionLock(StorageActionBlockType action_type) override;

    void onActionLockRemove(StorageActionBlockType action_type) override;

    /// Wait till replication queue's current last entry is processed or till size becomes 0
    /// If timeout is exceeded returns false
    bool waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode, std::unordered_set<String> source_replicas);

    /// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
    void getStatus(ReplicatedTableStatus & res, bool with_zk_fields = true);

    using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
    void getQueue(LogEntriesData & res, String & replica_name);

    std::vector<PartMovesBetweenShardsOrchestrator::Entry> getPartMovesBetweenShardsEntries();

    /// Get replica delay relative to current time.
    time_t getAbsoluteDelay() const;

    /// If the absolute delay is greater than min_relative_delay_to_measure,
    /// will also calculate the difference from the unprocessed time of the best replica.
    /// NOTE: Will communicate to ZooKeeper to calculate relative delay.
    void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay);

    /// Add a part to the queue of parts whose data you want to check in the background thread.
    void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0);

    DataValidationTasksPtr getCheckTaskList(const CheckTaskFilter & check_task_filter, ContextPtr context) override;
    std::optional<CheckResult> checkDataNext(DataValidationTasksPtr & check_task_list) override;

    /// Checks ability to use granularity
    bool canUseAdaptiveGranularity() const override;

    /// Modify a CREATE TABLE query to make a variant which must be written to a backup.
    void applyMetadataChangesToCreateQueryForBackup(ASTPtr & create_query) const override;

    /// Makes backup entries to backup the data of the storage.
    void backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;

    /// Extract data from the backup and put it to the storage.
    void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;

    /** Remove a specific replica from zookeeper.
     * returns true if there are no replicas left
     */
    static bool dropReplica(zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info,
                            LoggerPtr logger, MergeTreeSettingsPtr table_settings = nullptr, std::optional<bool> * has_metadata_out = nullptr);

    bool dropReplica(const String & drop_replica, LoggerPtr logger);

    /// Removes table from ZooKeeper after the last replica was dropped
    static bool removeTableNodesFromZooKeeper(
        zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info2,
        const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger);

    /// Schedules job to execute in background pool (merge, mutate, drop range and so on)
    bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;

    /// Checks that fetches are not disabled with action blocker and pool for fetches
    /// is not overloaded
    bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;

    /// Fetch part only when it stored on shared storage like S3
    MutableDataPartPtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);

    /// Lock part in zookeeper for use shared data in several nodes
    void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional<HardlinkedFiles> hardlinked_files) const override;
    void lockSharedData(
        const IMergeTreeDataPart & part,
        const ZooKeeperWithFaultInjectionPtr & zookeeper,
        bool replace_existing_lock,
        std::optional<HardlinkedFiles> hardlinked_files) const;

    void getLockSharedDataOps(
        const IMergeTreeDataPart & part,
        const ZooKeeperWithFaultInjectionPtr & zookeeper,
        bool replace_existing_lock,
        std::optional<HardlinkedFiles> hardlinked_files,
        Coordination::Requests & requests) const;

    zkutil::EphemeralNodeHolderPtr lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;

    /// Unlock shared data part in zookeeper
    /// Return true if data unlocked
    /// Return false if data is still used by another node
    std::pair<bool, NameSet> unlockSharedData(const IMergeTreeDataPart & part) const override;
    std::pair<bool, NameSet>
    unlockSharedData(const IMergeTreeDataPart & part, const ZooKeeperWithFaultInjectionPtr & zookeeper) const;

    /// Unlock shared data part in zookeeper by part id
    /// Return true if data unlocked
    /// Return false if data is still used by another node
    static std::pair<bool, NameSet> unlockSharedDataByID(
        String part_id,
        const String & table_uuid,
        const MergeTreePartInfo & part_info,
        const String & replica_name_,
        const std::string & disk_type,
        const ZooKeeperWithFaultInjectionPtr & zookeeper_,
        const MergeTreeSettings & settings,
        LoggerPtr logger,
        const String & zookeeper_path_old,
        MergeTreeDataFormatVersion data_format_version,
        const ContextPtr & local_context);

    /// Fetch part only if some replica has it on shared storage like S3
    MutableDataPartPtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;

    /// Get best replica having this partition on a same type remote disk
    String getSharedDataReplica(const IMergeTreeDataPart & part, const DataSourceDescription & data_source_description) const;

    const String & getReplicaName() const { return replica_name; }
    const String & getReplicaPath() const { return replica_path; }

    /// Restores table metadata if ZooKeeper lost it.
    /// Used only on restarted readonly replicas (not checked). All active (Active) parts are moved to detached/
    /// folder and attached. Parts in all other states are just moved to detached/ folder.
    void restoreMetadataInZooKeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info, bool is_called_during_attach);

    /// Get throttler for replicated fetches
    ThrottlerPtr getFetchesThrottler() const
    {
        return replicated_fetches_throttler;
    }

    /// Get throttler for replicated sends
    ThrottlerPtr getSendsThrottler() const
    {
        return replicated_sends_throttler;
    }

    bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);

    // Return default or custom zookeeper name for table
    const String & getZooKeeperName() const { return zookeeper_info.zookeeper_name; }
    const String & getZooKeeperPath() const { return zookeeper_info.path; }
    const String & getFullZooKeeperPath() const { return zookeeper_info.full_path; }

    // Return table id, common for different replicas
    String getTableSharedID() const override;

    std::map<std::string, MutationCommands> getUnfinishedMutationCommands() const override;

    /// Check if there are new broken disks and enqueue part recovery tasks.
    void checkBrokenDisks();

    static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
        const String & replica_name, const String & zookeeper_path, const ContextPtr & local_context, const zkutil::ZooKeeperPtr & zookeeper);

    bool canUseZeroCopyReplication() const;

    bool isTableReadOnly () { return is_readonly || isStaticStorage(); }

    std::optional<bool> hasMetadataInZooKeeper () { return has_metadata_in_zookeeper; }

    /// Get a sequential consistent view of current parts.
    PartitionIdToMaxBlock getMaxAddedBlocks() const;

    void addLastSentPart(const MergeTreePartInfo & info);

    /// Wait required amount of milliseconds to give other replicas a chance to
    /// download unique parts from our replica
    using ShutdownDeadline = std::chrono::time_point<std::chrono::system_clock>;
    void waitForUniquePartsToBeFetchedByOtherReplicas(ShutdownDeadline shutdown_deadline);

private:
    std::atomic_bool are_restoring_replica {false};

    /// Delete old parts from disk and from ZooKeeper. Returns the number of removed parts
    size_t clearOldPartsAndRemoveFromZK();
    void clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKeeperPtr zookeeper, DataPartsVector && parts);

    template<bool async_insert>
    friend class ReplicatedMergeTreeSinkImpl;
    friend class ReplicatedMergeTreeSinkPatch;
    friend class ReplicatedMergeTreePartCheckThread;
    friend class ReplicatedMergeTreeCleanupThread;
    friend class AsyncBlockIDsCache<StorageReplicatedMergeTree>;
    friend class ReplicatedMergeTreeAlterThread;
    friend class ReplicatedMergeTreeRestartingThread;
    friend class ReplicatedMergeTreeAttachThread;
    friend class ReplicatedMergeTreeMergeStrategyPicker;
    friend struct ReplicatedMergeTreeLogEntry;
    friend class ScopedPartitionMergeLock;
    friend class ReplicatedMergeTreeQueue;
    friend class PartMovesBetweenShardsOrchestrator;
    friend class MergeTreeData;
    friend class MergeFromLogEntryTask;
    friend class MutateFromLogEntryTask;
    friend class ReplicatedMergeMutateTaskBase;

    using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker;
    using LogEntry = ReplicatedMergeTreeLogEntry;
    using LogEntryPtr = LogEntry::Ptr;

    using MergeTreeData::MutableDataPartPtr;

    zkutil::ZooKeeperPtr current_zookeeper;        /// Use only the methods below.
    mutable std::mutex current_zookeeper_mutex;    /// To recreate the session in the background thread.

    zkutil::ZooKeeperPtr tryGetZooKeeper() const;
    zkutil::ZooKeeperPtr getZooKeeper() const;
    /// Get connection from global context and reconnect if needed.
    /// NOTE: use it only when table is shut down, in all other cases
    /// use getZooKeeper() because it is managed by restarting thread
    /// which guarantees that we have only one connected object
    /// for table.
    zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const;
    zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const;
    zkutil::ZooKeeperPtr getZooKeeperAndAssertNotStaticStorage() const;
    void setZooKeeper();
    String getEndpointName() const;

    /// If true, the table is offline and can not be written to it.
    /// This flag is managed by RestartingThread.
    std::atomic_bool is_readonly {true};
    std::atomic_uint32_t readonly_start_time{0};

    /// If nullopt - ZooKeeper is not available, so we don't know if there is table metadata.
    /// If false - ZooKeeper is available, but there is no table metadata. It's safe to drop table in this case.
    std::optional<bool> has_metadata_in_zookeeper;

    bool is_readonly_metric_set = false;

    const TableZnodeInfo zookeeper_info;
    const String zookeeper_path; // shorthand for zookeeper_info.path

    const String replica_name; // shorthand for zookeeper_info.replica_name
    const String replica_path;

    ZooKeeperRetriesInfo create_query_zookeeper_retries_info TSA_GUARDED_BY(create_query_zookeeper_retries_info_mutex);
    mutable std::mutex create_query_zookeeper_retries_info_mutex;

    /** /replicas/me/is_active.
      */
    zkutil::EphemeralNodeHolderPtr replica_is_active_node;

    /** Is this replica "leading". The leader replica selects the parts to merge.
      * It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders.
      */
    std::atomic<bool> is_leader {false};

    InterserverIOEndpointPtr data_parts_exchange_endpoint;

    MergeTreeDataSelectExecutor reader;
    MergeTreeDataWriter writer;
    MergeTreeDataMergerMutator merger_mutator;

    MergeStrategyPicker merge_strategy_picker;

    /** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
     * In ZK entries in chronological order. Here it is not necessary.
     */
    ReplicatedMergeTreeQueue queue;
    std::atomic<time_t> last_queue_update_start_time{0};
    std::atomic<time_t> last_queue_update_finish_time{0};

    mutable std::mutex last_queue_update_exception_lock;
    String last_queue_update_exception;
    String getLastQueueUpdateException() const;

    DataPartsExchange::Fetcher fetcher;

    /// When activated, replica is initialized and startup() method could exit
    Poco::Event startup_event;

    /// Do I need to complete background threads (except restarting_thread)?
    std::atomic<bool> partial_shutdown_called {false};

    /// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires.
    Poco::Event partial_shutdown_event {false};     /// Poco::Event::EVENT_MANUALRESET

    std::atomic<bool> shutdown_called {false};
    std::atomic<bool> shutdown_prepared_called {false};
    std::optional<ShutdownDeadline> shutdown_deadline;

    /// We call flushAndPrepareForShutdown before acquiring DDLGuard, so we can shutdown a table that is being created right now
    mutable std::mutex flush_and_shutdown_mutex;


    mutable std::mutex last_sent_parts_mutex;
    std::condition_variable last_sent_parts_cv;
    std::deque<MergeTreePartInfo> last_sent_parts;

    /// Threads.
    ///

    /// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
    bool queue_update_in_progress = false;
    BackgroundSchedulePoolTaskHolder queue_updating_task;

    BackgroundSchedulePoolTaskHolder mutations_updating_task;
    Coordination::WatchCallbackPtr mutations_watch_callback;

    /// A task that selects parts to merge.
    BackgroundSchedulePoolTaskHolder merge_selecting_task;
    /// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
    std::mutex merge_selecting_mutex;

    UInt64 merge_selecting_sleep_ms;

    /// A task that marks finished mutations as done.
    BackgroundSchedulePoolTaskHolder mutations_finalizing_task;

    /// A thread that removes old parts, log entries, and blocks.
    ReplicatedMergeTreeCleanupThread cleanup_thread;

    AsyncBlockIDsCache<StorageReplicatedMergeTree> async_block_ids_cache;

    /// A thread that checks the data of the parts, as well as the queue of the parts to be checked.
    ReplicatedMergeTreePartCheckThread part_check_thread;

    /// A thread that processes reconnection to ZooKeeper when the session expires.
    ReplicatedMergeTreeRestartingThread restarting_thread;
    EventNotifier::HandlerPtr session_expired_callback_handler;

    /// A thread that attaches the table using ZooKeeper
    std::optional<ReplicatedMergeTreeAttachThread> attach_thread;

    PartMovesBetweenShardsOrchestrator part_moves_between_shards_orchestrator;

    std::atomic<bool> initialization_done{false};

    /// True if replica was created for existing table with fixed granularity
    bool other_replicas_fixed_granularity = false;

    /// Throttlers used in DataPartsExchange to lower maximum fetch/sends
    /// speed.
    ThrottlerPtr replicated_fetches_throttler;
    ThrottlerPtr replicated_sends_throttler;

    /// Global ID, synced via ZooKeeper between replicas
    mutable std::mutex table_shared_id_mutex;
    mutable UUID table_shared_id;

    std::mutex last_broken_disks_mutex;
    std::set<String> last_broken_disks;

    std::mutex existing_zero_copy_locks_mutex;

    struct ZeroCopyLockDescription
    {
        std::string replica;
        std::shared_ptr<std::atomic<bool>> exists;
    };

    std::unordered_map<String, ZeroCopyLockDescription> existing_zero_copy_locks;

    void readLocalImpl(
        QueryPlan & query_plan,
        const Names & column_names,
        const StorageSnapshotPtr & storage_snapshot,
        SelectQueryInfo & query_info,
        ContextPtr local_context,
        size_t max_block_size,
        size_t num_streams);

    void readLocalSequentialConsistencyImpl(
        QueryPlan & query_plan,
        const Names & column_names,
        const StorageSnapshotPtr & storage_snapshot,
        SelectQueryInfo & query_info,
        ContextPtr local_context,
        size_t max_block_size,
        size_t num_streams);

    void readParallelReplicasImpl(
        QueryPlan & query_plan,
        const Names & column_names,
        SelectQueryInfo & query_info,
        ContextPtr local_context,
        QueryProcessingStage::Enum processed_stage);

    template <class Func>
    void foreachActiveParts(Func && func, bool select_sequential_consistency) const;

    /** Creates the minimum set of nodes in ZooKeeper and create first replica.
      * Returns true if was created, false if exists.
      */
    bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info) const;
    bool createTableIfNotExistsAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const;

    /**
     * Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas.
     */
    void createReplica(const StorageMetadataPtr & metadata_snapshot, const ZooKeeperRetriesInfo & zookeeper_retries_info) const;
    void createReplicaAttempt(const StorageMetadataPtr & metadata_snapshot, QueryStatusPtr process_list_element) const;

    /** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running.
      */
    void createNewZooKeeperNodes(const ZooKeeperRetriesInfo & zookeeper_retries_info) const;
    void createNewZooKeeperNodesAttempt() const;

    /// Returns the ZooKeeper retries info specified for the CREATE TABLE query which is creating and starting this table right now.
    ZooKeeperRetriesInfo getCreateQueryZooKeeperRetriesInfo() const;
    void clearCreateQueryZooKeeperRetriesInfo();

    bool checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check,
                             const ZooKeeperRetriesInfo & zookeeper_retries_info) const;
    bool checkTableStructureAttempt(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, int32_t * metadata_version, bool strict_check) const;

    /// A part of ALTER: apply metadata changes only (data parts are altered separately).
    /// Must be called under IStorage::lockForAlter() lock.
    void setTableStructure(
        const StorageID & table_id, const ContextPtr & local_context,
        ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff,
        int32_t new_metadata_version);

    /** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/).
      * If any parts described in ZK are not locally, throw an exception.
      * If any local parts are not mentioned in ZK, remove them.
      *  But if there are too many, throw an exception just in case - it's probably a configuration error.
      */
    void checkParts(bool skip_sanity_checks);
    bool checkPartsImpl(bool skip_sanity_checks);

    /// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor
    /// to be used for deduplication.
    void syncPinnedPartUUIDs(const ZooKeeperRetriesInfo & zookeeper_retries_info);

    /** Check that the part's checksum is the same as the checksum of the same part on some other replica.
      * If no one has such a part, nothing checks.
      * Not very reliable: if two replicas add a part almost at the same time, no checks will occur.
      * Adds actions to `ops` that add data about the part into ZooKeeper.
      * Call under lockForShare.
      */
    bool checkPartChecksumsAndAddCommitOps(
        const ZooKeeperWithFaultInjectionPtr & zookeeper,
        const DataPartPtr & part,
        Coordination::Requests & ops,
        String part_name,
        NameSet & absent_replicas_paths);

    String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const;

    bool getOpsToCheckPartChecksumsAndCommit(const ZooKeeperWithFaultInjectionPtr & zookeeper, const MutableDataPartPtr & part,
                                             std::optional<HardlinkedFiles> hardlinked_files, bool replace_zero_copy_lock,
                                             Coordination::Requests & ops, size_t & num_check_ops);

    /// Accepts a PreActive part, atomically checks its checksums with ones on other replicas and commit the part
    DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional<HardlinkedFiles> hardlinked_files = {}, bool replace_zero_copy_lock=false);

    bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;

    void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const String & block_id_path = "") const;

    void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const std::vector<String> & block_id_paths) const;

    /// Adds actions to `ops` that remove a part from ZooKeeper.
    /// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
    void getRemovePartFromZooKeeperOps(const String & part_name, Coordination::Requests & ops, bool has_children);

    /// Quickly removes big set of parts from ZooKeeper (using async multi queries)
    void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
                                  NameSet * parts_should_be_retried = nullptr);

    /// Remove parts from ZooKeeper, throw exception if unable to do so after max_retries.
    void removePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
    void removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries = 5);

    void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & part_name) override;

    void paranoidCheckForCoveredPartsInZooKeeperOnStart(const Strings & parts_in_zk, const Strings & parts_to_fetch) const;

    /// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
    void removePartAndEnqueueFetch(const String & part_name, bool storage_init);

    /// Running jobs from the queue.

    /** Execute the action from the queue. Throws an exception if something is wrong.
      * Returns whether or not it succeeds. If it did not work, write it to the end of the queue.
      */
    bool executeLogEntry(LogEntry & entry);

    /// Lookup the part for the entry in the detached/ folder.
    /// returns nullptr if the part is corrupt or missing.
    MutableDataPartPtr attachPartHelperFoundValidPart(const LogEntry& entry, PartsTemporaryRename & rename_parts) const;

    void executeDropRange(const LogEntry & entry);

    /// Execute alter of table metadata. Set replica/metadata and replica/columns
    /// nodes in zookeeper and also changes in memory metadata.
    /// New metadata and columns values stored in entry.
    bool executeMetadataAlter(const LogEntry & entry);

    /// Fetch part from other replica (inserted or merged/mutated)
    /// NOTE: Attention! First of all tries to find covering part on other replica
    /// and set it into entry.actual_new_part_name. After that tries to fetch this new covering part.
    /// If fetch was not successful, clears entry.actual_new_part_name.
    bool executeFetch(LogEntry & entry, bool need_to_check_missing_part=true);

    bool executeReplaceRange(LogEntry & entry);
    void executeClonePartFromShard(const LogEntry & entry);

    /** Updates the queue.
      */
    void queueUpdatingTask();

    void mutationsUpdatingTask();

    /** Clone data from another replica.
      * If replica can not be cloned throw Exception.
      */
    void cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);

    /// Repairs metadata of staled replica. Called from cloneReplica(...)
    void cloneMetadataIfNeeded(const String & source_replica, const String & source_path, zkutil::ZooKeeperPtr & zookeeper);

    /// Clone replica if it is lost.
    void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);


    ReplicatedMergeTreeQueue::SelectedEntryPtr selectQueueEntry();


    MergeFromLogEntryTaskPtr getTaskToProcessMergeQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);

    bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);

    /// Start being leader (if not disabled by setting).
    /// Since multi-leaders are allowed, it just sets is_leader flag.
    void startBeingLeader(const ZooKeeperRetriesInfo & zookeeper_retries_info);
    void stopBeingLeader();

    /** Selects the parts to merge and writes to the log.
      */
    void mergeSelectingTask();

    /// Checks if some mutations are done and marks them as done.
    void mutationsFinalizingTask();

    /** Write the selected parts to merge into the log,
      * Call when merge_selecting_mutex is locked.
      * Returns false if any part is not in ZK.
      */
    enum class CreateMergeEntryResult : uint8_t { Ok, MissingPart, LogUpdated, Other };

    CreateMergeEntryResult createLogEntryToMergeParts(
        zkutil::ZooKeeperPtr & zookeeper,
        const DataPartsVector & parts,
        const DataPartsVector & patch_parts,
        const String & merged_name,
        const UUID & merged_part_uuid,
        const MergeTreeDataPartFormat & merged_part_format,
        bool deduplicate,
        const Names & deduplicate_by_columns,
        bool cleanup,
        ReplicatedMergeTreeLogEntryData * out_log_entry,
        int32_t log_version,
        MergeType merge_type);

    CreateMergeEntryResult createLogEntryToMutatePart(
        const IMergeTreeDataPart & part,
        const UUID & new_part_uuid,
        Int64 mutation_version,
        int32_t alter_version,
        int32_t log_version);

    /** Returns an empty string if no one has a part.
      */
    String findReplicaHavingPart(const String & part_name, bool active);
    static String findReplicaHavingPart(const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_);

    bool checkReplicaHavePart(const String & replica, const String & part_name);
    bool checkIfDetachedPartExists(const String & part_name);
    bool checkIfDetachedPartitionExists(const String & partition_name);

    /** Find replica having specified part or any part that covers it.
      * If active = true, consider only active replicas.
      * If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part.
      * If not found, returns empty string.
      */
    String findReplicaHavingCoveringPart(LogEntry & entry, bool active);
    bool findReplicaHavingCoveringPart(const String & part_name, bool active);
    String findReplicaHavingCoveringPartImplLowLevel(LogEntry * entry, const String & part_name, String & found_part_name, bool active);
    static std::set<MergeTreePartInfo> findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, LoggerPtr log_);

    /** Download the specified part from the specified replica.
      * If `to_detached`, the part is placed in the `detached` directory.
      * If quorum != 0, then the node for tracking the quorum is updated.
      * Returns false if part is already fetching right now.
      */
    bool fetchPart(
        const String & part_name,
        const StorageMetadataPtr & metadata_snapshot,
        const String & source_zookeeper_name,
        const String & source_replica_path,
        bool to_detached,
        size_t quorum,
        zkutil::ZooKeeper::Ptr zookeeper_ = nullptr,
        bool try_fetch_shared = true);

    /** Download the specified part from the specified replica.
      * Used for replace local part on the same s3-shared part in hybrid storage.
      * Returns false if part is already fetching right now.
      */
    MutableDataPartPtr fetchExistsPart(
        const String & part_name,
        const StorageMetadataPtr & metadata_snapshot,
        const String & replica_path,
        DiskPtr replaced_disk,
        String replaced_part_path);

    /// Required only to avoid races between executeLogEntry and fetchPartition
    std::unordered_set<String> currently_fetching_parts;
    mutable std::mutex currently_fetching_parts_mutex;

    /// With the quorum being tracked, add a replica to the quorum for the part.
    void updateQuorum(const String & part_name, bool is_parallel);

    /// Deletes info from quorum/last_part node for particular partition_id.
    void cleanLastPartNode(const String & partition_id);

    /// Part name is stored in quorum/last_part for corresponding partition_id.
    bool partIsLastQuorumPart(const MergeTreePartInfo & part_info) const;

    /// Part currently inserting with quorum (node quorum/parallel/part_name exists)
    bool partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const;

    /// Creates new block number if block with such block_id does not exist
    /// If zookeeper_path_prefix specified then allocate block number on this path
    /// (can be used if we want to allocate blocks on other replicas)
    std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
        const String & partition_id,
        const zkutil::ZooKeeperPtr & zookeeper,
        const String & zookeeper_block_id_path = "",
        const String & zookeeper_path_prefix = "",
        const std::optional<String> & znode_data = std::nullopt) const;

    template<typename T>
    std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(
        const String & partition_id,
        const ZooKeeperWithFaultInjectionPtr & zookeeper,
        const T & zookeeper_block_id_path,
        const String & zookeeper_path_prefix = "",
        const std::optional<String> & znode_data = std::nullopt) const;

    /** Wait until all replicas, including this, execute the specified action from the log.
      * If replicas are added at the same time, it can not wait the added replica.
      *
      * Waits for inactive replicas no more than wait_for_inactive_timeout.
      * Returns list of inactive replicas that have not executed entry or throws exception.
      *
      * NOTE: This method must be called without table lock held.
      * Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock.
      */
    void waitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
                                             Int64 wait_for_inactive_timeout, const String & error_context = {});
    Strings tryWaitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
                                                   Int64 wait_for_inactive_timeout);

    /** Wait until the specified replica executes the specified action from the log.
      * NOTE: See comment about locks above.
      */
    bool tryWaitForReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name,
                                            const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0);

    /// Depending on settings, do nothing or wait for this replica or all replicas process log entry.
    void waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context = {});

    /// Throw an exception if the table is readonly.
    void assertNotReadonly() const;

    /// Throw an exception if the table is readonly because it's a static storage.
    void assertNotStaticStorage() const;

    /// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
    /// Returns false if the partition doesn't exist yet.
    /// Caller must hold delimiting_block_lock until creation of drop/replace entry in log.
    /// Otherwise some replica may assign merge which intersects part_info.
    bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info,
                                                std::optional<EphemeralLockInZooKeeper> & delimiting_block_lock, bool for_replace_range = false);

    /// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
    mutable std::unordered_set<std::string> existing_nodes_cache;
    mutable std::mutex existing_nodes_cache_mutex;
    bool existsNodeCached(const ZooKeeperWithFaultInjectionPtr & zookeeper, const std::string & path) const;

    /// Cancels INSERTs in the block range by removing ephemeral block numbers
    void clearLockedBlockNumbersInPartition(zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);

    void getClearBlocksInPartitionOps(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);

    void getClearBlocksInPartitionOpsImpl(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num, const String & blocks_dir_name);
    /// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range.
    void clearBlocksInPartition(
        zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);

    /// Info about how other replicas can access this one.
    ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const;

    bool addOpsToDropAllPartsInPartition(
        zkutil::ZooKeeper & zookeeper, const String & partition_id, bool detach,
        Coordination::Requests & ops, std::vector<LogEntryPtr> & entries,
        std::vector<EphemeralLockInZooKeeper> & delimiting_block_locks,
        std::vector<size_t> & log_entry_ops_idx);

    void dropPartitions(const zkutil::ZooKeeperPtr & zookeeper, const Strings & partition_ids, bool detach, ContextPtr query_context);
    void dropAllPartsInPartitions(zkutil::ZooKeeper & zookeeper, const Strings & partition_ids, std::vector<LogEntryPtr> & entries, ContextPtr query_context, bool detach);

    void dropPartNoWaitNoThrow(const String & part_name) override;
    void dropPart(const String & part_name, bool detach, ContextPtr query_context) override;

    // Partition helpers
    void dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) override;
    PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override;
    void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
    void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;
    void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, ContextPtr query_context) override;
    CancellationCode killPartMoveToShard(const UUID & task_uuid) override;
    void fetchPartition(
        const ASTPtr & partition,
        const StorageMetadataPtr & metadata_snapshot,
        const String & from,
        bool fetch_part,
        ContextPtr query_context) override;
    void forgetPartition(const ASTPtr & partition, ContextPtr query_context) override;


    /// NOTE: there are no guarantees for concurrent merges. Dropping part can
    /// be concurrently merged into some covering part and dropPart will do
    /// nothing. There are some fundamental problems with it. But this is OK
    /// because:
    ///
    /// dropPart used in the following cases:
    /// 1) Remove empty parts after TTL.
    /// 2) Remove parts after move between shards.
    /// 3) User queries: ALTER TABLE DROP PART 'part_name'.
    ///
    /// In the first case merge of empty part is even better than DROP. In the
    /// second case part UUIDs used to forbid merges for moving parts so there
    /// is no problem with concurrent merges. The third case is quite rare and
    /// we give very weak guarantee: there will be no active part with this
    /// name, but possibly it was merged to some other part.
    ///
    /// NOTE: don't rely on dropPart if you 100% need to remove non-empty part
    /// and don't use any explicit locking mechanism for merges.
    bool dropPartImpl(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop);

    /// Check granularity of already existing replicated table in zookeeper if it exists
    /// return true if it's fixed
    bool checkFixedGranularityInZookeeper(const ZooKeeperRetriesInfo & zookeeper_retries_info) const;

    /// Wait for timeout seconds mutation is finished on replicas
    void waitMutationToFinishOnReplicas(
        const Strings & replicas, const String & mutation_id) const;

    MutationsSnapshotPtr getMutationsSnapshot(const IMutationsSnapshot::Params & params) const override;

    void startBackgroundMovesIfNeeded() override;

    /// Attaches restored parts to the storage.
    void attachRestoredParts(MutableDataPartsVector && parts) override;

    std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;

    PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(
        const MutationCommands & commands,
        CommittingBlock::Op op,
        ContextPtr query_context,
        const zkutil::ZooKeeperPtr & zookeeper) const;

    static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, const std::string & disk_type, const String & table_uuid,
        const String & part_name, const String & zookeeper_path_old, const ContextPtr & local_context);

    static void createZeroCopyLockNode(
        const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node,
        int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
        const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});

    static void getZeroCopyLockNodeCreateOps(
        const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests,
        int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
        const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});


    bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override;

    /// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.
    void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override;

    // Create table id if needed
    void createTableSharedID(const ZooKeeperRetriesInfo & zookeeper_retries_info) const;
    void createTableSharedIDAttempt() const;

    bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica);
    void watchZeroCopyLock(const String & part_name, const DiskPtr & disk);

    std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk);

    /// Create ephemeral lock in zookeeper for part and disk which support zero copy replication.
    /// If no connection to zookeeper, shutdown, readonly -- return std::nullopt.
    /// If somebody already holding the lock -- return unlocked ZeroCopyLock object (not std::nullopt).
    std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override;

    /// Wait for ephemral lock to disappear. Return true if table shutdown/readonly/timeout exceeded, etc.
    /// Or if node actually disappeared.
    bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override;

    void startupImpl(bool from_attach_thread, const ZooKeeperRetriesInfo & zookeeper_retries_info);

    std::vector<String> getZookeeperZeroCopyLockPaths() const;
    static void dropZookeeperZeroCopyLockPaths(zkutil::ZooKeeperPtr zookeeper,
                                               std::vector<String> zero_copy_locks_paths, LoggerPtr logger);

    struct DataValidationTasks : public IStorage::DataValidationTasksBase
    {
        explicit DataValidationTasks(DataPartsVector && parts_, std::unique_lock<std::mutex> && parts_check_lock_)
            : parts_check_lock(std::move(parts_check_lock_)), parts(std::move(parts_)), it(parts.begin())
        {}

        DataPartPtr next()
        {
            std::lock_guard lock(mutex);
            if (it == parts.end())
                return nullptr;
            return *(it++);
        }

        size_t size() const override
        {
            std::lock_guard lock(mutex);
            return std::distance(it, parts.end());
        }

        std::unique_lock<std::mutex> parts_check_lock;

        mutable std::mutex mutex;
        DataPartsVector parts;
        DataPartsVector::const_iterator it;
    };

    const String TMP_PREFIX_REPLACE_PARTITION_FROM = "tmp_replace_from_";
    std::unique_ptr<ReplicatedMergeTreeLogEntryData> replacePartitionFromImpl(
        const Stopwatch & watch,
        ProfileEventsScope & profile_events_scope,
        const StorageMetadataPtr & metadata_snapshot,
        const MergeTreeData & src_data,
        const String & partition_id,
        const zkutil::ZooKeeperPtr & zookeeper,
        bool replace,
        const bool & zero_copy_enabled,
        const bool & always_use_copy_instead_of_hardlinks,
        const ContextPtr & query_context);
};

String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);


/** There are three places for each part, where it should be
  * 1. In the RAM, data_parts, all_data_parts.
  * 2. In the filesystem (FS), the directory with the data of the table.
  * 3. in ZooKeeper (ZK).
  *
  * When adding a part, it must be added immediately to these three places.
  * This is done like this
  * - [FS] first write the part into a temporary directory on the filesystem;
  * - [FS] rename the temporary part to the result on the filesystem;
  * - [RAM] immediately afterwards add it to the `data_parts`, and remove from `data_parts` any parts covered by this one;
  * - [RAM] also set the `Transaction` object, which in case of an exception (in next point),
  *   rolls back the changes in `data_parts` (from the previous point) back;
  * - [ZK] then send a transaction (multi) to add a part to ZooKeeper (and some more actions);
  * - [FS, ZK] by the way, removing the covered (old) parts from filesystem, from ZooKeeper and from `all_data_parts`
  *   is delayed, after a few minutes.
  *
  * There is no atomicity here.
  * It could be possible to achieve atomicity using undo/redo logs and a flag in `DataPart` when it is completely ready.
  * But it would be inconvenient - I would have to write undo/redo logs for each `Part` in ZK, and this would increase already large number of interactions.
  *
  * Instead, we are forced to work in a situation where at any time
  *  (from another thread, or after server restart), there may be an unfinished transaction.
  *  (note - for this the part should be in RAM)
  * From these cases the most frequent one is when the part is already in the data_parts, but it's not yet in ZooKeeper.
  * This case must be distinguished from the case where such a situation is achieved due to some kind of damage to the state.
  *
  * Do this with the threshold for the time.
  * If the part is young enough, its lack in ZooKeeper will be perceived optimistically - as if it just did not have time to be added there
  *  - as if the transaction has not yet been executed, but will soon be executed.
  * And if the part is old, its absence in ZooKeeper will be perceived as an unfinished transaction that needs to be rolled back.
  *
  * PS. Perhaps it would be better to add a flag to the DataPart that a part is inserted into ZK.
  * But here it's too easy to get confused with the consistency of this flag.
  */
/// NOLINTNEXTLINE
#define MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER (5 * 60)

}
